1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.codec.websocketcodec; 12 import kiss.logger; 13 import collie.codec.http.codec.httpcodec; 14 import collie.codec.http.httptansaction; 15 import std.bitmanip; 16 import collie.codec.http.codec.wsframe; 17 import collie.codec.http.httpmessage; 18 import collie.codec.http.errocode; 19 import std.conv; 20 21 enum FRAME_SIZE_IN_BYTES = 512 * 512 * 2; //maximum size of a frame when sending a message 22 23 24 class WebsocketCodec : HTTPCodec 25 { 26 enum ProcessingState 27 { 28 PS_READ_HEADER, 29 PS_READ_Do_LENGTH, 30 PS_READ_PAYLOAD_LENGTH_1, 31 PS_READ_PAYLOAD_LENGTH, 32 PS_READ_BIG_PAYLOAD_LENGTH, 33 PS_READ_BIG_PAYLOAD_LENGTH_1, 34 PS_READ_MASK, 35 PS_READ_MASK_1, 36 PS_READ_PAYLOAD 37 } 38 39 this(TransportDirection direc, HTTPTransaction txn) 40 { 41 _transportDirection = direc; 42 _transaction = txn; 43 } 44 45 override CodecProtocol getProtocol() { 46 return CodecProtocol.WEBSOCKET; 47 } 48 49 override void onConnectClose() 50 { 51 if(_transaction){ 52 _transaction.onErro(HTTPErrorCode.REMOTE_CLOSED); 53 _transaction.handler = null; 54 _transaction.transport = null; 55 } 56 } 57 58 override void onTimeOut() 59 { 60 if(_transaction){ 61 _transaction.onErro(HTTPErrorCode.TIME_OUT); 62 } 63 } 64 65 override void detach(HTTPTransaction txn) 66 { 67 if(txn is _transaction) 68 _transaction = null; 69 } 70 71 72 override TransportDirection getTransportDirection() 73 { 74 return _transportDirection; 75 } 76 77 override StreamID createStream() { 78 return 0; 79 } 80 81 override bool isBusy() { 82 return !_finished; 83 } 84 85 override bool shouldClose() 86 { 87 return _shouldClose; 88 } 89 90 override void setParserPaused(bool paused){} 91 92 override void setCallback(CallBack callback) { 93 _callback = callback; 94 } 95 96 override size_t onIngress(ubyte[] buf) 97 { 98 readFrame(buf); 99 return buf.length; 100 } 101 102 override size_t generateHeader( 103 HTTPTransaction txn, 104 HTTPMessage msg, 105 HttpWriteBuffer buffer, 106 bool eom = false) 107 { 108 return 0; 109 } 110 111 override size_t generateBody(HTTPTransaction txn, 112 HttpWriteBuffer chain,in ubyte[] data, 113 bool eom) 114 { 115 return 0; 116 } 117 118 override size_t generateChunkHeader( 119 HTTPTransaction txn, 120 HttpWriteBuffer buffer, 121 size_t length) 122 { 123 return 0; 124 } 125 126 127 override size_t generateChunkTerminator( 128 HTTPTransaction txn, 129 HttpWriteBuffer buffer) 130 { 131 return 0; 132 } 133 134 override size_t generateEOM(HTTPTransaction txn, 135 HttpWriteBuffer buffer) 136 { 137 return 0; 138 } 139 140 override size_t generateRstStream(HTTPTransaction txn, 141 HttpWriteBuffer buffer,HTTPErrorCode code) 142 { 143 return 0; 144 } 145 146 override size_t generateWsFrame(HTTPTransaction txn, 147 HttpWriteBuffer buffer,OpCode code, ubyte[] data) 148 { 149 if((code & 0x08) == 0x08 && (data.length > 125)) 150 data = data[0 .. 125]; 151 if(code == OpCode.OpCodeClose) 152 _shouldClose = true; 153 154 int numFrames = cast(int)(data.length / FRAME_SIZE_IN_BYTES); 155 auto sizeLeft = data.length % FRAME_SIZE_IN_BYTES; 156 if (numFrames == 0) 157 numFrames = 1; 158 size_t currentPosition = 0; 159 size_t bytesLeft = data.length; 160 size_t bytesWritten = 0; 161 const OpCode firstOpCode = code; 162 for (int i = 0; i < numFrames; ++i) 163 { 164 165 const bool isLastFrame = (i == (numFrames - 1)); 166 const bool isFirstFrame = (i == 0); 167 168 const OpCode opcode = isFirstFrame ? firstOpCode : OpCode.OpCodeContinue; 169 170 const size_t payloadLength = bytesLeft < FRAME_SIZE_IN_BYTES ? bytesLeft 171 : FRAME_SIZE_IN_BYTES; 172 ubyte[] send = data[bytesWritten .. (bytesWritten + payloadLength)]; 173 getFrameHeader(opcode, payloadLength, isLastFrame, buffer); 174 if (doMask()) 175 { 176 ubyte[4] mask = generateMaskingKey(); 177 buffer.write(mask[]); 178 buffer.write(send); 179 auto tdata = cast(ubyte[])buffer.sendData; 180 for (size_t j = tdata.length - payloadLength; j < tdata.length; j++) 181 { 182 tdata[j] ^= mask[j % 4]; 183 } 184 } 185 else 186 { 187 buffer.write(send); 188 } 189 bytesLeft -= payloadLength; 190 bytesWritten += payloadLength; 191 } 192 return buffer.length; 193 } 194 195 ubyte[4] generateMaskingKey() // Client will used 196 { 197 import std.datetime; 198 import std.bitmanip; 199 uint key = cast(uint)(Clock.currTime.toUnixTime!long()); 200 return nativeToBigEndian(key); 201 } 202 203 protected: 204 bool doMask(){return _transportDirection == TransportDirection.UPSTREAM;} 205 206 void getFrameHeader(OpCode code, size_t payloadLength, bool lastFrame, HttpWriteBuffer buffer) 207 { 208 ubyte[2] wdata = [0, 0]; 209 wdata[0] = cast(ubyte)((code & 0x0F) | (lastFrame ? 0x80 : 0x00)); 210 if(doMask()) 211 wdata[1] = 0x80; 212 if (payloadLength <= 125){ 213 wdata[1] |= to!ubyte(payloadLength); 214 buffer.write(wdata[]); 215 } else if (payloadLength <= ushort.max) { 216 wdata[1] |= 126; 217 buffer.write(wdata[]); 218 ubyte[2] length = nativeToBigEndian(to!ushort(payloadLength)); 219 buffer.write(length[]); 220 } else { 221 wdata[1] |= 127; 222 buffer.write(wdata[]); 223 auto length = nativeToBigEndian(payloadLength); 224 buffer.write(length[]); 225 } 226 } 227 228 bool checkValidity() 229 { 230 void setError(CloseCode code, string closeReason) 231 { 232 frame._closeCode = code; 233 frame._closeReason = closeReason; 234 frame._isValid = false; 235 } 236 237 if (frame._rsv1 || frame._rsv2 || frame._rsv3) 238 { 239 setError(CloseCode.CloseCodeProtocolError, "Rsv field is non-zero"); 240 } 241 else if (isOpCodeReserved(frame._opCode)) 242 { 243 setError(CloseCode.CloseCodeProtocolError, "Used reserved opcode"); 244 } 245 else if (frame.isControlFrame()) 246 { 247 if (_length > 125) 248 { 249 setError(CloseCode.CloseCodeProtocolError, "Control frame is larger than 125 bytes"); 250 } 251 else if (!frame._isFinalFrame) 252 { 253 setError(CloseCode.CloseCodeProtocolError, "Control frames cannot be fragmented"); 254 } 255 else 256 { 257 frame._isValid = true; 258 } 259 } 260 else 261 { 262 frame._isValid = true; 263 } 264 return frame._isValid; 265 } 266 267 bool isOpCodeReserved(OpCode code) 268 { 269 return ((code > OpCode.OpCodeBinary) && (code < OpCode.OpCodeClose)) 270 || (code > OpCode.OpCodePong); 271 } 272 273 pragma(inline) 274 void clear() 275 { 276 _state = ProcessingState.PS_READ_HEADER; 277 _mask[] = 0; 278 _hasMask = false; 279 _buffer[] = 0; 280 _readLen = 0; 281 frame = WSFrame(); 282 } 283 284 void readFrame(in ubyte[] data) 285 { 286 void resultOne() 287 { 288 if (frame.isValid && frame.isDataFrame()) 289 { 290 if (!frame.isContinuationFrame()) 291 { 292 _lastcode = frame.opCode(); 293 } 294 frame._lastCode = _lastcode; 295 if (_hasMask) 296 { //解析mask 297 for (size_t i = 0; i < _length; ++i) 298 { 299 frame.data[i] = frame.data[i] ^ _mask[i % 4]; 300 } 301 } 302 } 303 if(_callback) 304 _callback.onWsFrame(_transaction,frame); 305 clear(); 306 } 307 308 const size_t len = data.length; 309 for (size_t i = 0; i < len; ++i) 310 { 311 ubyte ch = data[i]; 312 final switch (_state) 313 { 314 case ProcessingState.PS_READ_HEADER: 315 frame._isFinalFrame = (ch & 0x80) != 0; 316 frame._rsv1 = ((ch & 0x40) != 0); 317 frame._rsv2 = ((ch & 0x20) != 0); 318 frame._rsv3 = ((ch & 0x10) != 0); 319 frame._opCode = cast(OpCode)(ch & 0x0F); 320 _state = ProcessingState.PS_READ_Do_LENGTH; 321 break; 322 case ProcessingState.PS_READ_Do_LENGTH: 323 { 324 _hasMask = (ch & 0x80) != 0; 325 auto tlen = (ch & 0x7F); 326 switch (tlen) 327 { 328 case 126: 329 { 330 _state = ProcessingState.PS_READ_PAYLOAD_LENGTH; 331 break; 332 } 333 case 127: 334 { 335 _state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH; 336 break; 337 } 338 default: 339 { 340 _length = tlen; 341 frame.data = new ubyte[_length]; 342 _state = _hasMask ? ProcessingState.PS_READ_MASK 343 : ProcessingState.PS_READ_PAYLOAD; 344 break; 345 } 346 } 347 if (!checkValidity()) 348 { 349 _state = ProcessingState.PS_READ_HEADER; 350 resultOne(); 351 } 352 } 353 break; 354 case ProcessingState.PS_READ_PAYLOAD_LENGTH: 355 { 356 if (len - i >= 2) 357 { 358 ubyte[2] tlen = data[i .. (i + 2)]; 359 ++i; 360 _length = bigEndianToNative!(ushort)(tlen); 361 frame.data = new ubyte[_length]; 362 _state = _hasMask ? ProcessingState.PS_READ_MASK 363 : ProcessingState.PS_READ_PAYLOAD; 364 } 365 else 366 { 367 _buffer[] = 0; 368 _buffer[0] = ch; 369 _state = ProcessingState.PS_READ_PAYLOAD_LENGTH_1; 370 } 371 } 372 break; 373 case ProcessingState.PS_READ_PAYLOAD_LENGTH_1: 374 { 375 _buffer[1] = ch; 376 ubyte[2] tlen = _buffer[0 .. 2]; 377 _length = bigEndianToNative!ushort(tlen); 378 frame.data = new ubyte[_length]; 379 _state = _hasMask ? ProcessingState.PS_READ_MASK 380 : ProcessingState.PS_READ_PAYLOAD; 381 } 382 break; 383 case ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH: 384 auto llen = len - i; 385 if (llen >= 8) 386 { 387 ubyte[8] tlen = data[i .. (i + 8)]; 388 i += 7; 389 _length = cast(size_t) bigEndianToNative!ulong(tlen); 390 frame.data = new ubyte[_length]; 391 _state = _hasMask ? ProcessingState.PS_READ_MASK 392 : ProcessingState.PS_READ_PAYLOAD; 393 _readLen = 0; 394 } 395 else 396 { 397 _buffer[] = 0; 398 _buffer[0 .. llen] = data[i .. $]; 399 _readLen = llen; 400 i += llen; 401 _state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1; 402 } 403 break; 404 case ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1: 405 { 406 auto llen = len - i; 407 auto rlen = 8 - _readLen; 408 if (llen >= rlen) 409 { 410 _buffer[_readLen .. 8] = data[i .. (i + rlen)]; 411 i += rlen; 412 --i; 413 _length = cast(size_t) bigEndianToNative!ulong(_buffer); 414 frame.data = new ubyte[_length]; 415 _state = _hasMask ? ProcessingState.PS_READ_MASK 416 : ProcessingState.PS_READ_PAYLOAD; 417 _readLen = 0; 418 } 419 else 420 { 421 _buffer[_readLen .. (_readLen + llen)] = data[i .. $]; 422 _readLen += llen; 423 i += llen; 424 _state = ProcessingState.PS_READ_BIG_PAYLOAD_LENGTH_1; 425 } 426 } 427 break; 428 case ProcessingState.PS_READ_MASK: 429 auto llen = len - i; 430 if (llen >= 4) 431 { 432 const ubyte[] tlen = data[i .. (i + 4)]; 433 i += 3; 434 _mask[] = tlen[]; 435 _state = ProcessingState.PS_READ_PAYLOAD; 436 _readLen = 0; 437 } 438 else 439 { 440 _mask[] = 0; 441 _mask[0 .. llen] = data[i .. $]; 442 _readLen = llen; 443 i += llen; 444 _state = ProcessingState.PS_READ_MASK_1; 445 } 446 break; 447 case ProcessingState.PS_READ_MASK_1: 448 { 449 auto llen = len - i; 450 auto rlen = 4 - _readLen; 451 if (llen >= rlen) 452 { 453 _mask[_readLen .. 4] = data[i .. (i + rlen)]; 454 i += rlen; 455 --i; 456 _state = ProcessingState.PS_READ_PAYLOAD; 457 _readLen = 0; 458 } 459 else 460 { 461 _mask[_readLen .. (_readLen + llen)] = data[i .. $]; 462 _readLen += llen; 463 i += llen; 464 _state = ProcessingState.PS_READ_MASK_1; 465 } 466 } 467 break; 468 case ProcessingState.PS_READ_PAYLOAD: 469 { 470 if(_length>0) logDebugf("_length = %d / %d", _length, len); 471 auto llen = len - i; 472 auto rlen = _length - _readLen; 473 if (llen >= rlen) 474 { 475 frame.data[_readLen .. (_readLen + rlen)] = data[i .. (i + rlen)]; 476 i += rlen; 477 --i; 478 _state = ProcessingState.PS_READ_HEADER; 479 resultOne(); 480 } 481 else 482 { 483 frame.data[_readLen .. (_readLen + llen)] = data[i .. $]; 484 _readLen += llen; 485 } 486 } 487 break; 488 489 } 490 } 491 } 492 493 private: 494 TransportDirection _transportDirection; 495 bool _finished; 496 bool _shouldClose = false; 497 CallBack _callback; 498 HTTPTransaction _transaction; 499 500 ProcessingState _state; 501 OpCode _lastcode; 502 WSFrame frame; 503 bool _hasMask; 504 ubyte[4] _mask; 505 ubyte[8] _buffer; 506 size_t _length; 507 size_t _readLen; 508 } 509